﻿using System;
using System.Collections;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using UnityEngine;


public class MyUdpClientConn
{
    
    private const bool Log_Enable = true;

    private readonly string m_Name;
    private readonly Queue<SocketHelper.ThreadMessage> m_ThreadMessageQueue;
    private IPEndPoint m_ServerIpPoint;
    public int m_BindPort = 8081;

    private Socket m_Socket;
    private bool m_IsStopping;
    
    private Thread m_RecvThread;
    private Dictionary<ushort, MyUdpPacket> m_RecvDict = new Dictionary<ushort, MyUdpPacket>();
    private long m_LastRecvTime;
    private long m_LastPingTime;
    private int m_PingNum; //Ping计数, 成功Recv则重置为0

    private ushort m_SendId = 0;
    private Thread m_SendThread;
    private Queue<string> m_SendQueue = new Queue<string>();
    private Dictionary<ushort, MyUdpPacket> m_WaitRemoteResponseDict = new Dictionary<ushort, MyUdpPacket>();
    private List<MyUdpPacket> m_WaitRemoteResponseList = new List<MyUdpPacket>();

    private Queue<int> m_ResponseQueue = new Queue<int>(); //要发送的回应数据

    public MyUdpClientConn(string name, Queue<SocketHelper.ThreadMessage> msgQueue)
    {
        m_Name = name;
        m_ThreadMessageQueue = msgQueue;
    }

    public void SetServerAddress(IPEndPoint endPoint)
    {
        m_ServerIpPoint = endPoint;
    }

    public void Start()
    {
        if (null != m_Socket || null != m_RecvThread || null != m_SendThread) return;
        if (m_IsStopping)
        {
            m_IsStopping = false;
            m_LastPingTime = 0;
            m_PingNum = 0;

            if (Log_Enable) Debug.Log($"{m_Name}: send:{m_SendQueue.Count}, waitACK:{m_WaitRemoteResponseDict.Count}, sendResp:{m_ResponseQueue.Count}");
            m_SendQueue.Clear();
            m_WaitRemoteResponseDict.Clear();
            m_WaitRemoteResponseList.Clear();
            m_ResponseQueue.Clear();
        }

        m_Socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
        var bindEP = new IPEndPoint(IPAddress.Parse("127.0.0.1"), m_BindPort); //发送和监听端口
        m_Socket.Bind(bindEP);
        //m_Socket.NoDelay = true;
        
        m_LastRecvTime = SocketHelper.CurrentTimeMillis();
        
        m_RecvThread = new Thread(OnReceiveThread);
        m_RecvThread.IsBackground = true;
        m_RecvThread.Start(m_Socket);

        m_SendThread = new Thread(OnSendThread);
        m_SendThread.IsBackground = true;
        m_SendThread.Start(m_Socket);
    }

    public int Status
    {
        get
        {
            if (null != m_Socket)
                return 1; //started

            if (null == m_RecvThread && null == m_SendThread)
                return 0; //stopped

            return -1; //stopping
        }
    }

    public bool IsStarted
    {
        get { return null != m_Socket; }
    }

    public bool IsStopped
    {
        get
        {
            return null == m_Socket && null == m_RecvThread && null == m_SendThread;
        }
    }

    public void Send(string str)
    {
        if (null == m_SendThread)
            return;

        lock (m_SendQueue)
        {
            m_SendQueue.Enqueue(str);
            if (1 == m_SendQueue.Count)
            {
                Monitor.Pulse(m_SendQueue); //唤醒发送线程
            }
        }
    }

    public void Stop()
    {
        if (null == m_Socket) return;
        if (m_IsStopping) return;
        m_IsStopping = true;

        var t = m_RecvThread;
        if (null != t)
            t.Interrupt();

        t = m_SendThread;
        if (null != t)
            t.Interrupt();

        SocketHelper.ShutdownAndClose(m_Socket);
        m_Socket = null;
    }

    //******************** 异步线程

    private void OnReceiveThread(object obj)
    {
        var socket = (Socket)obj;

        RecvLoop(socket);

        m_RecvThread = null;
        if (Log_Enable) Debug.Log($"{m_Name}: recvThread exit: tid:{Thread.CurrentThread.ManagedThreadId}");
    }

    private void RecvLoop(Socket socket)
    {
        byte[] buff = new byte[UdpHelper.UDP_Slice_Head_Len + UdpHelper.UDP_Slice_Max_Body_Size];
        int notReadLen = 0;
        EndPoint remoteEP = new IPEndPoint(0, 8000);
        while (!m_IsStopping)
        {
            long now = SocketHelper.CurrentTimeMillis();
            try
            {
                int buffRemainBytes = buff.Length - notReadLen;
                if (buffRemainBytes <= 0)
                {
                    Debug.LogError($"{m_Name}: buffRemainBytes <= 0, local:{socket.LocalEndPoint}");
                    break; //视为断开
                }

                int recvLen = socket.ReceiveFrom(buff, notReadLen, buffRemainBytes, 0, ref remoteEP); //shutdown或close时, 阻塞会被中断
                if (!remoteEP.Equals(m_ServerIpPoint))
                    continue;

                m_LastRecvTime = now;
                m_PingNum = 0;
                if (0 == recvLen) //FIN
                {
                    //todo: 服务器中断连接
                    if (Log_Enable) Debug.Log($"{m_Name}: FIN: local:{socket.LocalEndPoint}");
                    continue;
                }

                notReadLen += recvLen;

                int readedLen = 0;
                while (UdpHelper.IsUdpSliceReady(buff, readedLen, notReadLen, out var opCode, out var bodyLen))
                {
                    ParseRecvData(socket, buff, remoteEP, readedLen, opCode, bodyLen);

                    notReadLen -= UdpHelper.UDP_Slice_Head_Len;
                    notReadLen -= bodyLen;

                    readedLen += UdpHelper.UDP_Slice_Head_Len;
                    readedLen += bodyLen;
                }

                if (notReadLen > 0)
                {
                    //todo: 没有粘包问题的
                    if (Log_Enable) Debug.Log($"{m_Name}: RecvThread: readedLen:{readedLen}, notReadLen:{notReadLen}, local:{socket.LocalEndPoint}");
                    Buffer.BlockCopy(buff, readedLen, buff, 0, notReadLen);
                }
            }
            catch (ObjectDisposedException ex)
            {
                Debug.LogWarning($"{m_Name}: RecvThread: connected:{socket.Connected}, {ex.GetType().Name}: {ex.Message}");
            }
            catch (ThreadAbortException ex)
            {
                Debug.LogWarning($"{m_Name}: RecvThread: connected:{socket.Connected}, {ex.GetType().Name}: {ex.Message}");
            }
            catch (SocketException ex)
            {
                Debug.LogWarning($"{m_Name}: RecvThread: connected:{socket.Connected}, {ex.GetType().Name}:{ex.ErrorCode}, {ex.Message}");
            }
            catch (Exception ex)
            {
                Debug.LogError($"{m_Name}: RecvThread: connected:{socket.Connected}, {ex.GetType().Name}: {ex.Message}");
                Debug.LogError($"{m_Name}: {ex.StackTrace}");
            }
        }
    }

    private void ParseRecvData(Socket socket, byte[] buff, EndPoint remoteEP, int readedLen, int opCode, int bodyLen)
    {
        ushort msgId = BitConverter.ToUInt16(buff, 0);
        int sliceIndex = BitConverter.ToInt16(buff, 4);
        int sliceCount = BitConverter.ToInt16(buff, 6);
        if (Log_Enable) Debug.Log($"{m_Name}: recv slice ready: msgId:{msgId}, opCode:{opCode}, bodyLen:{bodyLen} <-{remoteEP}");

        switch (opCode)
        {
        case 0: //普通包
        {
            //回应数据已收到
            lock (m_ResponseQueue)
            {
                int v = BitConverter.ToInt32(buff, 0);
                m_ResponseQueue.Enqueue(v);
            }

            if (!m_RecvDict.TryGetValue(msgId, out var udpPacket))
            {
                udpPacket = new MyUdpPacket();
                udpPacket.m_Slices = new MyUdpSlice[sliceCount];
                m_RecvDict.Add(msgId, udpPacket);
            }

            if (sliceIndex < 0 || sliceIndex >= udpPacket.m_Slices.Length)
            {
                Debug.LogWarning($"{m_Name}: invalid sliceIndex:{sliceIndex}");
                return;
            }

            var slice = udpPacket.m_Slices[sliceIndex];
            if (null == slice)
            {
                slice = new MyUdpSlice();
                udpPacket.m_Slices[sliceIndex] = slice;
            }

            slice.m_BodyLen = bodyLen;
            Buffer.BlockCopy(buff, readedLen, slice.m_Data, 0, UdpHelper.UDP_Slice_Head_Len + bodyLen);
            slice.m_IsRemoteResp = true;
            slice.m_LastSentTime = SocketHelper.CurrentTimeMillis();
            if (udpPacket.CheckAllReady(out var str))
            {
                var msg = new SocketHelper.ThreadMessage();
                msg.m_Cmd = "Packet";
                msg.m_Content = str;
                EnqueueMessage(msg);

                m_RecvDict.Remove(msgId);
            }
        }
        break;

        case 1: //收到对方回应数据
        {
            if (Log_Enable) Debug.Log($"{m_Name}: server response received: msgId:{msgId}, slice:{sliceIndex} <-{remoteEP}");
            lock (m_WaitRemoteResponseDict)
            {
                if (m_WaitRemoteResponseDict.TryGetValue(msgId, out var packet))
                {
                    if (sliceIndex < 0 || sliceIndex >= packet.m_Slices.Length)
                    {
                        Debug.LogWarning($"{m_Name}: invalid sliceIndex:{sliceIndex}");
                        return;
                    }
                    else
                    {
                        var slice = packet.m_Slices[sliceIndex];
                        slice.m_IsRemoteResp = true;
                        slice.m_LastSentTime = SocketHelper.CurrentTimeMillis();
                    }
                }
                else
                {
                    Debug.LogWarning($"{m_Name}: invalid msgId:{msgId}");
                }
            }
        }
        break;

        case 2: //心跳包Ping
        {
            if (Log_Enable) Debug.Log($"{m_Name}: recv Ping, {remoteEP}, local:{socket.LocalEndPoint}");
            //回应Pong
            lock (m_ResponseQueue)
            {
                int v = BitConverter.ToInt32(buff, 0);
                m_ResponseQueue.Enqueue(v);
            }
        }
        break;
        }
    }

    private void OnSendThread(object obj)
    {
        var socket = (Socket)obj;
        SendLoop(socket);

        m_SendThread = null;
        if (Log_Enable) Debug.Log($"{m_Name}: sendThread exit: tid:{Thread.CurrentThread.ManagedThreadId}");
    }

    private void SendLoop(Socket socket)
    {
        byte[] buff = new byte[UdpHelper.UDP_Slice_Head_Len];
        while (!m_IsStopping)
        {
            long t1 = SocketHelper.CurrentTimeMillis();
            string jsonStr = null;
            lock (m_SendQueue)
            {
                //while (m_SendQueue.Count <= 0)
                // Monitor.Wait(m_SendQueue);
                if (m_SendQueue.Count > 0)
                {
                    jsonStr = m_SendQueue.Peek();
                }
            }

            CheckReSend(socket);
            CheckResponseQueue(socket, buff);
            //CheckPing(socket, buff);

            try
            {
                if (null == jsonStr)
                {
                    long t2 = SocketHelper.CurrentTimeMillis();
                    if (t2 - t1 < 3)
                    {
                        Thread.Sleep(3);
                    }
                    continue;
                }

                var strBytes = Encoding.UTF8.GetBytes(jsonStr);
                int strLen = strBytes.Length;
                int remainStrLen = strLen;

                var packet = new MyUdpPacket();
                int sliceCount = Mathf.CeilToInt((float)strLen / UdpHelper.UDP_Slice_Max_Body_Size);
                packet.m_Slices = new MyUdpSlice[sliceCount];
                ushort msgId = m_SendId++;
                for (int i = 0; i < sliceCount; ++i)
                {
                    var slice = new MyUdpSlice();
                    packet.m_Slices[i] = slice;

                    int bodyLen = Mathf.Min(UdpHelper.UDP_Slice_Max_Body_Size, remainStrLen);
                    slice.m_BodyLen = bodyLen;
                    //包头
                    SocketHelper.ConvertInt16To2Bytes((short)msgId, slice.m_Data, 0);
                    slice.m_Data[2] = 0; //普通消息
                    slice.m_Data[3] = (byte)(strLen & 0xff);
                    SocketHelper.ConvertInt16To2Bytes((short)i, slice.m_Data, 4);
                    SocketHelper.ConvertInt16To2Bytes((short)sliceCount, slice.m_Data, 6);
                    //消息体
                    Buffer.BlockCopy(strBytes, strLen - remainStrLen, slice.m_Data, UdpHelper.UDP_Slice_Head_Len, bodyLen);

                    remainStrLen -= UdpHelper.UDP_Slice_Max_Body_Size;
                }

                m_WaitRemoteResponseList.Add(packet);
                lock (m_WaitRemoteResponseDict)
                {
                    m_WaitRemoteResponseDict.Add(msgId, packet);
                }

                for (int i = 0; i < sliceCount; ++i)
                {
                    var slice = packet.m_Slices[i];
                    if (Log_Enable) Debug.Log($"{m_Name}: startSend: slice:{i} ->{m_ServerIpPoint}");
                    SendData(socket, slice.m_Data, UdpHelper.UDP_Slice_Head_Len + slice.m_BodyLen);
                    slice.m_LastSentTime = SocketHelper.CurrentTimeMillis();
                }
                if (Log_Enable) Debug.Log($"{m_Name}: allSlice sendEnd");        
                if (Log_Enable) Debug.Log("");        
            }
            catch (ThreadInterruptedException ex)
            {
                Debug.LogWarning($"{m_Name}: {ex.Message}\n{ex.StackTrace}");
                break;
            }
            catch (Exception ex)
            {
                Debug.LogError($"{m_Name}: {ex.GetType().Name}:{ex.Message}");
            }

            //不管是否发送成功, 都弹出?
            lock (m_SendQueue)
            {
                if (m_SendQueue.Count > 0)
                    m_SendQueue.Dequeue();
            }
        }
    }

    //检查我发的消息对方是否回应已收到
    private void CheckReSend(Socket socket)
    {
        long now = SocketHelper.CurrentTimeMillis();
        int nullElementCount = 0;
        for (int i = 0; i < m_WaitRemoteResponseList.Count; ++i)
        {
            var udpPacket = m_WaitRemoteResponseList[i];
            if (null == udpPacket)
            {
                nullElementCount++;
                continue; //todo: null元素移到最后
            }

            int notReadyNum = 0;
            int retryFailNum = 0;
            for (int j = 0; j < udpPacket.m_Slices.Length; ++j)
            {
                var slice = udpPacket.m_Slices[j];
                if (null == slice || slice.m_IsRemoteResp)
                {
                    continue;
                }

                notReadyNum++;
                long elapseTime = now - slice.m_LastSentTime;
                if (elapseTime >= 5 * 1000) //重发
                {
                    if (slice.m_ReSendCount >= 5)
                    {
                        retryFailNum++; //todo: 通知重发失败
                        Debug.LogError($"{m_Name}: reSend fail: {udpPacket.GetMsgId()}");
                    }
                    else
                    {
                        slice.m_ReSendCount++;
                        SendData(socket, slice.m_Data, UdpHelper.UDP_Slice_Head_Len + slice.m_BodyLen);
                        if (Log_Enable) Debug.Log($"{m_Name}: reSend slice:{j}, elapse:{elapseTime}, local:{socket.LocalEndPoint}");

                        now = SocketHelper.CurrentTimeMillis();
                        slice.m_LastSentTime = now;
                    }
                }
            }

            if (notReadyNum <= 0 || retryFailNum > 0)
            {
                m_WaitRemoteResponseList[i] = null;
                lock (m_WaitRemoteResponseDict)
                {
                    m_WaitRemoteResponseDict.Remove(udpPacket.GetMsgId());
                }
            }
        }

        float nullPrg = (float)nullElementCount / m_WaitRemoteResponseList.Count;
        if (nullPrg >= 0.2f)
        {
            SocketHelper.RemoveNullElements(m_WaitRemoteResponseList);
        }
    }

    //别人发过来的消息要回应, 比如: 消息已收到, PONG
    private void CheckResponseQueue(Socket socket, byte[] buff)
    {
        while (true)
        {
            lock (m_ResponseQueue)
            {
                if (m_ResponseQueue.Count > 0)
                {
                    int v = m_ResponseQueue.Dequeue();
                    SocketHelper.ConvertInt32To4Bytes(v, buff, 0);
                    switch (buff[2]) //opCode
                    {
                    case 0: //普通包回应已收到
                        buff[2] = 1;
                        break;

                    case 2: //Ping包回应Pong
                        buff[2] = 3;
                        break;
                    }
                    buff[3] = 0;
                    //sliceIndex
                    buff[4] = 0;
                    buff[5] = 0;
                    //sliceCount
                    buff[6] = 0;
                    buff[7] = 0;

                    ushort msgId = BitConverter.ToUInt16(buff, 0);
                    if (Log_Enable) Debug.Log($"{m_Name}: response:{buff[2]}, msgId:{msgId} start: local:{socket.LocalEndPoint}");
                    try
                    {
                        SendData(socket, buff, buff.Length);
                    }
                    catch (Exception ex)
                    {
                        Debug.LogError($"{m_Name}: {ex.GetType().Name}:{ex.Message}");
                    }
                }
                else
                {
                    break;
                }
            }
        }
    }

    private void CheckPing(Socket socket, byte[] buff)
    {
        long now = SocketHelper.CurrentTimeMillis();
        long recvElapseTime = now - m_LastRecvTime;
        if (recvElapseTime < 20 * 1000) return;

        long pingElapseTime = now - m_LastPingTime;
        if (pingElapseTime < 1000) return;

        m_LastPingTime = now;
        if (m_PingNum >= 3)
        {
            //第3次Ping没回应时, 认为与服务器断开
            Debug.LogError($"{m_Name}: have disconnect: Ping Fail, {m_PingNum}");
            return;
        }

        m_PingNum++;
        //msgId: 2-byte
        buff[0] = 0;
        buff[1] = 0;
        //opCode: 1-byte
        buff[2] = 2;
        //bodyLen: 1-byte
        buff[3] = 0;
        //sliceIndex: 2-byte
        buff[4] = 0;
        buff[5] = 0;
        //sliceCount: 2-byte
        buff[6] = 0;
        buff[7] = 0;

        if (Log_Enable) Debug.Log($"{m_Name}: sendPingStart: opCode:{buff[2]} ->{m_ServerIpPoint}");
        try
        {
            SendData(socket, buff, buff.Length);
        }
        catch (Exception ex)
        {
            Debug.LogError($"{m_Name}: {ex.GetType().Name}:{ex.Message}");
            Debug.LogError($"{ex.StackTrace}");
        }
    }

    private void SendData(Socket socket, byte[] data, int dataLen)
    {
        int sliceSendLen = 0;
        do
        {
            int remainLen = dataLen - sliceSendLen;
            int realSendLen = socket.SendTo(data, sliceSendLen, remainLen, 0, m_ServerIpPoint);
            if (realSendLen <= 0) //数据发不出, 断开了?
            {
                Debug.LogError($"{m_Name}: sendLen <= 0: local:{socket.LocalEndPoint}");
                break;
            }
            sliceSendLen += realSendLen;
            Thread.Sleep(1);
        } while (sliceSendLen < dataLen);
    }

    private void EnqueueMessage(SocketHelper.ThreadMessage msg)
    {
        if (m_IsStopping) //渲染线程正在写true前, 这边读到了false: 则渲染线程后续会读到失效连接的消息, 要紧吗?
        {
            Debug.LogWarning($"{m_Name}: have close: msg discard:{msg.m_Cmd}");
            return;
        }

        lock (m_ThreadMessageQueue)
        {
            m_ThreadMessageQueue.Enqueue(msg);
        }
    }

    //********************
}
